package com.bw;

import com.bw.gmall.realtime.common.base.BaseSqlApp;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.util.SQLUtil;
import com.util.SplitFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DwsTrafficSourceKeywordPageViewWindow extends BaseSqlApp {
    public static void main(String[] args) {
        new DwsTrafficSourceKeywordPageViewWindow().start(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW, 1, 10021);
    }

    @Override
    public void handle(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String groupId) {
        // 1.获取日志表搜索的数据
        tableEnv.executeSql("create table page_log(\n" +
                "  page map<String,String>,\n" +
                "  common map<String,String>,\n" +
                "  ts bigint,\n" +
                " et as to_timestamp_ltz(ts, 3), " +
                " watermark for et as et - interval '5' second " +
                ")" + SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, Constant.TOPIC_DWD_TRAFFIC_PAGE));

        // 2.获取搜索商品的数据
        Table table = tableEnv.sqlQuery("select \n" +
                "`page`['item'] item,\n" +
                "et\n" +
                "from page_log\n" +
                "where `page`['last_page_id'] = 'search'\n" +
                "and `page`['item_type'] = 'keyword'");


        tableEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);

        tableEnv.createTemporaryView("word", table);

        // 3. 炸开函数

        Table table1 = tableEnv.sqlQuery("select \n" +
                "  item,\n" +
                "  word,\n" +
                "  et\n" +
                "from word,lateral table(SplitFunction(item))");

        tableEnv.createTemporaryView("words", table1);


        // 4. 开窗聚合
        Table table2 = tableEnv.sqlQuery("SELECT\n" +
                "  date_format(TUMBLE_START(et, interval '5' second),'yyyy-MM-dd HH:mm:ss') AS stt,\n" +
                "  date_format(TUMBLE_END(et, interval '5' second),'yyyy-MM-dd HH:mm:ss') AS edt,\n" +
                "  date_format(now() , 'yyyyMMdd') cur_date, " +
                "  word keyword,\n" +
                "  count(*) keyword_count\n" +
                "FROM words\n" +
                "GROUP BY\n" +
                "  TUMBLE(et, interval '5' second ),\n" +
                "  word");
//        table2.execute().print();


        Table result = tableEnv.sqlQuery("select " +
                " date_format(window_start, 'yyyy-MM-dd HH:mm:ss') stt, " +
                " date_format(window_end, 'yyyy-MM-dd HH:mm:ss') edt, " +
                " date_format(now(), 'yyyyMMdd') cur_date, " +
                " word keyword," +
                " count(*) keyword_count " +
                "from table( tumble(table words, descriptor(et), interval '5' second ) ) " +
                "group by window_start, window_end, word ");

//        result.execute().print();

        // 5. 写出doris
        tableEnv.executeSql("CREATE TABLE dws_traffic_source_keyword_page_view_window (\n" +
                " stt  STRING,\n" +
                " edt STRING,\n" +
                " cur_date STRING,\n" +
                " keyword  STRING,\n" +
                " keyword_count BIGINT\n" +
                ")" + SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));
//
//        // 使用doris必须开启CK
//        table2.insertInto(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW).execute();





    }
}
